package com.jeesuite.admin.controller.admin;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import com.jeesuite.admin.dao.entity.MonitorServerEntity;
import com.jeesuite.admin.dao.mapper.MonitorServerEntityMapper;
import com.jeesuite.admin.exception.JeesuiteBaseException;
import com.jeesuite.admin.model.SelectOption;
import com.jeesuite.admin.model.WrapperResponseEntity;
import com.jeesuite.admin.util.SecurityUtil;
import com.jeesuite.kafka.monitor.KafkaMonitor;
import com.jeesuite.kafka.monitor.model.BrokerInfo;
import com.jeesuite.kafka.monitor.model.ConsumerGroupInfo;
import com.jeesuite.kafka.monitor.model.ProducerStat;

@Controller
@RequestMapping("/admin/kafka")
public class KafkaAdminController {
	
private static Map<String, KafkaMonitor> monitorInstances = new HashMap<>();
	
	private @Autowired MonitorServerEntityMapper monitorServerMapper;
	
	private KafkaMonitor getKafkaMonitor(String env){
		KafkaMonitor monitor = monitorInstances.get(env);
		if(monitor == null){
			synchronized (monitorInstances) {
				MonitorServerEntity zkServer = monitorServerMapper.findByEnvAndMoudule(env, "kafka:zookeeper");
				MonitorServerEntity kafkaServer = monitorServerMapper.findByEnvAndMoudule(env, "kafka");
				if(zkServer == null) throw new JeesuiteBaseException(2001, "无["+env+"][kafka:zookeeper]配置"); 
				if(kafkaServer == null) throw new JeesuiteBaseException(2001, "无["+env+"][kafka]配置"); 
				monitor = new KafkaMonitor(zkServer.getServers(), kafkaServer.getServers(), 1000);
				monitorInstances.put("dev", monitor);
			}
		}
		
		return monitor;
	}
	
	@RequestMapping(value = "brokers/{env}", method = RequestMethod.GET)
	public @ResponseBody List<BrokerInfo> getBrokerInfos(@PathVariable("env") String env){
		SecurityUtil.requireProfileGanted(env);
		return getKafkaMonitor(env).getAllBrokers();
	}
	
	@RequestMapping(value = "group/{env}", method = RequestMethod.GET)
	public @ResponseBody List<SelectOption> getGroups(@PathVariable("env") String env){
		
		SecurityUtil.requireProfileGanted(env);
		
		KafkaMonitor kafkaMonitor = getKafkaMonitor(env);
		
		List<SelectOption> result = new ArrayList<>();
		List<ConsumerGroupInfo> groupInfos = kafkaMonitor.getAllConsumerGroupInfos();
		for (ConsumerGroupInfo g : groupInfos) {
			result.add(new SelectOption(g.getGroupName(),g.getGroupName()));
		}
		
		Set<String> groups = kafkaMonitor.getAllProducerStats().keySet();
		for (String g : groups) {
			SelectOption opt = new SelectOption(g,g);
			if(result.contains(opt))continue;
			result.add(opt);
		}
		return result;
	}

	@RequestMapping(value = "topicinfos", method = RequestMethod.POST)
	public ResponseEntity<WrapperResponseEntity> topicinfos(@RequestBody Map<String,String> params){

		Map<String,Object> result = new HashMap<>();
		String env = params.get("env");
		String groupName = params.get("groupName");
		if(StringUtils.isBlank(groupName))throw new JeesuiteBaseException(1001, "groupName不能为空");
		
		SecurityUtil.requireProfileGanted(env);
		
		KafkaMonitor kafkaMonitor = getKafkaMonitor(env);
		List<ConsumerGroupInfo> groupInfos = kafkaMonitor.getAllConsumerGroupInfos();
		for (ConsumerGroupInfo consumerGroup : groupInfos) {
			if(consumerGroup.getGroupName().equals(groupName)){
				result.put("consumer", consumerGroup.getTopics());
				break;
			}
		}
		
		List<ProducerStat> producerStats = kafkaMonitor.getProducerStats(groupName);
		if(producerStats != null && producerStats.size() > 0){
			result.put("producer", producerStats);
		}
		
		return new ResponseEntity<WrapperResponseEntity>(new WrapperResponseEntity(result),HttpStatus.OK);
	}
}
